Skip to content

Commit

Permalink
Cleanup and modularize receive path, improve timestamp support [20/x]
Browse files Browse the repository at this point in the history
Summary:
This diff renames `ReceivedPacket` to `ReceivedUdpPacket` to clarify that it maps to a UDP packet and not a QUIC packet. A single UDP packet can contain multiple QUIC packets due to coalescing.

--

This diff is part of a larger stack focused on the following:

- **Cleaning up client and server UDP packet receive paths while improving testability.** We currently have multiple receive paths for client and server. Capabilities vary significantly and there are few tests. For instance:
  - The server receive path supports socket RX timestamps, abet incorrectly in that it does not store timestamp per packet. In comparison, the client receive path does not currently support socket RX timestamps, although the code in `QuicClientTransport::recvmsg` and `QuicClientTransport::recvmmsg` makes reference to socket RX timestamps, making it confusing to understand the capabilities available when tracing through the code. This complicates the tests in `QuicTypedTransportTests`, as we have to disable test logic that depends on socket RX timestamps for client tests.
  - The client currently has three receive paths, and none of them are well tested.

- **Modularize and abstract components in the receive path.** This will make it easier to mock/fake the UDP socket and network layers.
  - `QuicClientTransport` and `QuicServerTransport` currently contain UDP socket handling logic that operates over lower layer primitives such `cmsg` and `io_vec` (see `QuicClientTransport::recvmmsg` and `...::recvmsg` as examples).
  - Because this UDP socket handling logic is inside of the mvfst transport implementations, it is difficult to test this logic in isolation and mock/fake the underlying socket and network layers. For instance, injecting a user space network emulator that operates at the socket layer would require faking `folly::AsyncUDPSocket`, which is non-trivial given that `AsyncUDPSocket` does not abstract away intricacies arising from the aforementioned lower layer primitives.
  - By shifting this logic into an intermediate layer between the transport and the underlying UDP socket, it will be easier to mock out the UDP socket layer when testing functionality at higher layers, and inject fake components when we want to emulate the network between a mvfst client and server. It will also be easier for us to have unit tests focused on testing interactions between the UDP socket implementation and this intermediate layer.

- **Improving receive path timestamping.** We only record a single timestamp per `NetworkData` at the moment, but (1) it is possible for a `NetworkData` to have multiple packets, each with their own timestamps, and (2) we should be able to record both userspace and socket timestamps.

Reviewed By: silver23arrow

Differential Revision: D48788809

fbshipit-source-id: 3793c30212d545e226f3e5337289bc2601dfa553
  • Loading branch information
Brandon Schlinker authored and facebook-github-bot committed Nov 28, 2023
1 parent d9f0575 commit 83ad2ad
Show file tree
Hide file tree
Showing 24 changed files with 130 additions and 119 deletions.
4 changes: 2 additions & 2 deletions quic/api/QuicTransportBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1886,8 +1886,8 @@ void QuicTransportBase::onNetworkData(
.setNumPacketsReceived(networkData.getPackets().size())
.setNumBytesReceived(networkData.getTotalData());
for (auto& packet : networkData.getPackets()) {
builder.addReceivedPacket(
SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket::
builder.addReceivedUdpPacket(
SocketObserverInterface::PacketsReceivedEvent::ReceivedUdpPacket::
Builder()
.setPacketReceiveTime(packet.timings.receiveTimePoint)
.setPacketNumBytes(packet.buf->computeChainDataLength())
Expand Down
2 changes: 1 addition & 1 deletion quic/api/QuicTransportBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class QuicTransportBase : public QuicSocket, QuicStreamPrioritiesObserver {
*/
virtual void onReadData(
const folly::SocketAddress& peer,
ReceivedPacket&& udpPacket) = 0;
ReceivedUdpPacket&& udpPacket) = 0;

/**
* Invoked when we have to write some data to the wire.
Expand Down
2 changes: 1 addition & 1 deletion quic/api/test/QuicTransportBaseTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class TestQuicTransport
return lossTimeout_.getTimeRemaining();
}

void onReadData(const folly::SocketAddress&, ReceivedPacket&& udpPacket)
void onReadData(const folly::SocketAddress&, ReceivedUdpPacket&& udpPacket)
override {
if (!udpPacket.buf) {
return;
Expand Down
20 changes: 10 additions & 10 deletions quic/api/test/QuicTypedTransportTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1372,10 +1372,10 @@ struct AckEventMatcherBuilder {
};

template <typename T>
struct ReceivedPacketMatcherBuilder {
using Builder = ReceivedPacketMatcherBuilder;
struct ReceivedUdpPacketMatcherBuilder {
using Builder = ReceivedUdpPacketMatcherBuilder;
using Obj =
quic::SocketObserverInterface::PacketsReceivedEvent::ReceivedPacket;
quic::SocketObserverInterface::PacketsReceivedEvent::ReceivedUdpPacket;
Builder&& setExpectedPacketReceiveTime(
const TimePoint expectedPacketReceiveTime) {
maybeExpectedPacketReceiveTime = expectedPacketReceiveTime;
Expand Down Expand Up @@ -1411,7 +1411,7 @@ struct ReceivedPacketMatcherBuilder {
FAIL(); // unhandled typed test
}
}
explicit ReceivedPacketMatcherBuilder() = default;
explicit ReceivedUdpPacketMatcherBuilder() = default;

folly::Optional<TimePoint> maybeExpectedPacketReceiveTime;
folly::Optional<uint64_t> maybeExpectedPacketNumBytes;
Expand Down Expand Up @@ -4619,7 +4619,7 @@ TYPED_TEST(
testing::Field(&Event::receivedPackets, testing::SizeIs(1)),
testing::Field(
&Event::receivedPackets,
testing::ElementsAre(ReceivedPacketMatcherBuilder<TypeParam>()
testing::ElementsAre(ReceivedUdpPacketMatcherBuilder<TypeParam>()
.setExpectedPacketReceiveTime(pkt1RecvTime)
.setExpectedPacketNumBytes(pkt1NumBytes)
.build())));
Expand All @@ -4645,7 +4645,7 @@ TYPED_TEST(
testing::Field(&Event::receivedPackets, testing::SizeIs(1)),
testing::Field(
&Event::receivedPackets,
testing::ElementsAre(ReceivedPacketMatcherBuilder<TypeParam>()
testing::ElementsAre(ReceivedUdpPacketMatcherBuilder<TypeParam>()
.setExpectedPacketReceiveTime(pkt2RecvTime)
.setExpectedPacketNumBytes(pkt2NumBytes)
.build())));
Expand Down Expand Up @@ -4716,12 +4716,12 @@ TYPED_TEST(
&Event::receivedPackets,
testing::ElementsAre(
// pkt1
ReceivedPacketMatcherBuilder<TypeParam>()
ReceivedUdpPacketMatcherBuilder<TypeParam>()
.setExpectedPacketReceiveTime(pktBatch1RecvTime)
.setExpectedPacketNumBytes(pkt1NumBytes)
.build(),
// pkt2
ReceivedPacketMatcherBuilder<TypeParam>()
ReceivedUdpPacketMatcherBuilder<TypeParam>()
.setExpectedPacketReceiveTime(pktBatch1RecvTime)
.setExpectedPacketNumBytes(pkt2NumBytes)
.build())));
Expand Down Expand Up @@ -4759,12 +4759,12 @@ TYPED_TEST(
&Event::receivedPackets,
testing::ElementsAre(
// pkt1
ReceivedPacketMatcherBuilder<TypeParam>()
ReceivedUdpPacketMatcherBuilder<TypeParam>()
.setExpectedPacketReceiveTime(pktBatch2RecvTime)
.setExpectedPacketNumBytes(pkt3NumBytes)
.build(),
// pkt2
ReceivedPacketMatcherBuilder<TypeParam>()
ReceivedUdpPacketMatcherBuilder<TypeParam>()
.setExpectedPacketReceiveTime(pktBatch2RecvTime)
.setExpectedPacketNumBytes(pkt4NumBytes)
.build())));
Expand Down
2 changes: 1 addition & 1 deletion quic/api/test/TestQuicTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TestQuicTransport

void onReadData(
const folly::SocketAddress& /* peer */,
ReceivedPacket&& /* udpPacket */) noexcept override {}
ReceivedUdpPacket&& /* udpPacket */) noexcept override {}

void writeData() override {
if (closed) {
Expand Down
25 changes: 13 additions & 12 deletions quic/client/QuicClientTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ QuicClientTransport::~QuicClientTransport() {

void QuicClientTransport::processUdpPacket(
const folly::SocketAddress& peer,
ReceivedPacket&& udpPacket) {
ReceivedUdpPacket&& udpPacket) {
// Process the arriving UDP packet, which may have coalesced QUIC packets.
{
BufQueue udpData;
Expand Down Expand Up @@ -180,7 +180,7 @@ void QuicClientTransport::processUdpPacket(

void QuicClientTransport::processUdpPacketData(
const folly::SocketAddress& peer,
const ReceivedPacket::Timings& udpPacketTimings,
const ReceivedUdpPacket::Timings& udpPacketTimings,
BufQueue& udpPacketData) {
auto packetSize = udpPacketData.chainLength();
if (packetSize == 0) {
Expand Down Expand Up @@ -268,7 +268,8 @@ void QuicClientTransport::processUdpPacketData(
? clientConn_->pendingOneRttData
: clientConn_->pendingHandshakeData;
pendingData.emplace_back(
ReceivedPacket(std::move(cipherUnavailable->packet), udpPacketTimings),
ReceivedUdpPacket(
std::move(cipherUnavailable->packet), udpPacketTimings),
peer);
if (conn_->qLogger) {
conn_->qLogger->addPacketBuffered(
Expand Down Expand Up @@ -804,7 +805,7 @@ void QuicClientTransport::processUdpPacketData(

void QuicClientTransport::onReadData(
const folly::SocketAddress& peer,
ReceivedPacket&& udpPacket) {
ReceivedUdpPacket&& udpPacket) {
if (closeState_ == CloseState::CLOSED) {
// If we are closed, then we shouldn't process new network data.
QUIC_STATS(
Expand All @@ -814,10 +815,10 @@ void QuicClientTransport::onReadData(
}
return;
}
bool waitingForFirstPacket = !hasReceivedPackets(*conn_);
bool waitingForFirstPacket = !hasReceivedUdpPackets(*conn_);
processUdpPacket(peer, std::move(udpPacket));
if (connSetupCallback_ && waitingForFirstPacket &&
hasReceivedPackets(*conn_)) {
hasReceivedUdpPackets(*conn_)) {
connSetupCallback_->onFirstPeerPacketProcessed();
}
if (!transportReadyNotified_ && hasWriteCipher()) {
Expand Down Expand Up @@ -1250,18 +1251,18 @@ void QuicClientTransport::recvMsg(

offset += params.gro;
remaining -= params.gro;
networkData.addPacket(ReceivedPacket(std::move(tmp)));
networkData.addPacket(ReceivedUdpPacket(std::move(tmp)));
} else {
// do not clone the last packet
// start at offset, use all the remaining data
readBuffer->trimStart(offset);
DCHECK_EQ(readBuffer->length(), remaining);
remaining = 0;
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
}
}
} else {
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
}
trackDatagramReceived(bytesRead);
}
Expand Down Expand Up @@ -1394,18 +1395,18 @@ void QuicClientTransport::recvMmsg(

offset += params.gro;
remaining -= params.gro;
networkData.addPacket(ReceivedPacket(std::move(tmp)));
networkData.addPacket(ReceivedUdpPacket(std::move(tmp)));
} else {
// do not clone the last packet
// start at offset, use all the remaining data
readBuffer->trimStart(offset);
DCHECK_EQ(readBuffer->length(), remaining);
remaining = 0;
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
}
}
} else {
networkData.addPacket(ReceivedPacket(std::move(readBuffer)));
networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
}

trackDatagramReceived(bytesRead);
Expand Down
9 changes: 5 additions & 4 deletions quic/client/QuicClientTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ class QuicClientTransport
}

// From QuicTransportBase
void onReadData(const folly::SocketAddress& peer, ReceivedPacket&& udpPacket)
override;
void onReadData(
const folly::SocketAddress& peer,
ReceivedUdpPacket&& udpPacket) override;
void writeData() override;
void closeTransport() override;
void unbindConnection() override;
Expand Down Expand Up @@ -246,7 +247,7 @@ class QuicClientTransport
*/
void processUdpPacket(
const folly::SocketAddress& peer,
ReceivedPacket&& udpPacket);
ReceivedUdpPacket&& udpPacket);

/**
* Process data within a single UDP packet.
Expand All @@ -268,7 +269,7 @@ class QuicClientTransport
*/
void processUdpPacketData(
const folly::SocketAddress& peer,
const ReceivedPacket::Timings& udpPacketTimings,
const ReceivedUdpPacket::Timings& udpPacketTimings,
BufQueue& udpPacketData);

void startCryptoHandshake();
Expand Down
4 changes: 2 additions & 2 deletions quic/client/state/ClientStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ namespace quic {
struct CachedServerTransportParameters;

struct PendingClientData {
ReceivedPacket udpPacket;
ReceivedUdpPacket udpPacket;
folly::SocketAddress peer;

PendingClientData(ReceivedPacket udpPacketIn, folly::SocketAddress peerIn)
PendingClientData(ReceivedUdpPacket udpPacketIn, folly::SocketAddress peerIn)
: udpPacket(std::move(udpPacketIn)), peer(std::move(peerIn)) {}
};

Expand Down
2 changes: 1 addition & 1 deletion quic/codec/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ struct ShortHeader {
bool readInitialByte(uint8_t initalByte);
bool readConnectionId(folly::io::Cursor& cursor);
bool readPacketNum(
PacketNum largestReceivedPacketNum,
PacketNum largestReceivedUdpPacketNum,
folly::io::Cursor& cursor);

private:
Expand Down
12 changes: 6 additions & 6 deletions quic/codec/test/PacketNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ namespace quic {
namespace test {

struct Packet8DecodeData {
PacketNum largestReceivedPacketNum;
PacketNum largestReceivedUdpPacketNum;
uint8_t encoded;
PacketNum expected;
};

struct Packet16DecodeData {
PacketNum largestReceivedPacketNum;
PacketNum largestReceivedUdpPacketNum;
uint16_t encoded;
PacketNum expected;
};

struct Packet32DecodeData {
PacketNum largestReceivedPacketNum;
PacketNum largestReceivedUdpPacketNum;
uint32_t encoded;
PacketNum expected;
};
Expand All @@ -43,7 +43,7 @@ TEST_P(Packet8DecodeTest, Decode) {
decodePacketNumber(
GetParam().encoded,
sizeof(GetParam().encoded),
GetParam().largestReceivedPacketNum + 1));
GetParam().largestReceivedUdpPacketNum + 1));
}

TEST_P(Packet16DecodeTest, Decode) {
Expand All @@ -52,14 +52,14 @@ TEST_P(Packet16DecodeTest, Decode) {
decodePacketNumber(
GetParam().encoded,
sizeof(GetParam().encoded),
GetParam().largestReceivedPacketNum + 1));
GetParam().largestReceivedUdpPacketNum + 1));
}

TEST_P(Packet32DecodeTest, Decode) {
auto decoded = decodePacketNumber(
GetParam().encoded,
sizeof(GetParam().encoded),
GetParam().largestReceivedPacketNum + 1);
GetParam().largestReceivedUdpPacketNum + 1);
EXPECT_EQ(GetParam().expected, decoded) << std::hex << decoded;
}

Expand Down
27 changes: 17 additions & 10 deletions quic/common/NetworkData.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

namespace quic {

struct ReceivedPacket {
/**
* Received UDP packet with timings.
*
* A single UDP packet can contain multiple QUIC packets due to UDP packet
* coalescing (see RFC 9000, section 12.2). When invoked, this function attempts
* to transform the UDP packet data into one or more QUIC packets.
*/
struct ReceivedUdpPacket {
struct Timings {
/**
* Socket timestamp with additional information.
Expand Down Expand Up @@ -47,10 +54,10 @@ struct ReceivedPacket {
folly::Optional<SocketTimestampExt> maybeSoftwareTs;
};

ReceivedPacket() = default;
explicit ReceivedPacket(Buf&& bufIn) : buf(std::move(bufIn)) {}
ReceivedPacket(Buf&& bufIn, const Timings& timingsIn)
: buf(std::move(bufIn)), timings(timingsIn) {}
ReceivedUdpPacket() = default;
explicit ReceivedUdpPacket(Buf&& bufIn) : buf(std::move(bufIn)) {}
ReceivedUdpPacket(Buf&& bufIn, Timings timingsIn)
: buf(std::move(bufIn)), timings(std::move(timingsIn)) {}

Buf buf;
Timings timings;
Expand All @@ -72,7 +79,7 @@ struct NetworkData {
const TimePoint& receiveTimePointIn)
: receiveTimePoint_(receiveTimePointIn),
packets_([&packetBufs, &receiveTimePointIn]() {
std::vector<ReceivedPacket> result;
std::vector<ReceivedUdpPacket> result;
result.reserve(packetBufs.size());
for (auto& packetBuf : packetBufs) {
result.emplace_back(std::move(packetBuf));
Expand All @@ -94,17 +101,17 @@ struct NetworkData {
packets_.reserve(size);
}

void addPacket(ReceivedPacket&& packetIn) {
void addPacket(ReceivedUdpPacket&& packetIn) {
packets_.emplace_back(std::move(packetIn));
packets_.back().timings.receiveTimePoint = receiveTimePoint_;
totalData_ += packets_.back().buf->computeChainDataLength();
}

[[nodiscard]] const std::vector<ReceivedPacket>& getPackets() const {
[[nodiscard]] const std::vector<ReceivedUdpPacket>& getPackets() const {
return packets_;
}

std::vector<ReceivedPacket> movePackets() && {
std::vector<ReceivedUdpPacket> movePackets() && {
return std::move(packets_);
}

Expand Down Expand Up @@ -137,7 +144,7 @@ struct NetworkData {

private:
TimePoint receiveTimePoint_;
std::vector<ReceivedPacket> packets_;
std::vector<ReceivedUdpPacket> packets_;
size_t totalData_{0};
};

Expand Down
Loading

0 comments on commit 83ad2ad

Please sign in to comment.