Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initial WebSocket protocol implementation #73

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
19 changes: 19 additions & 0 deletions src/Bedrock.Framework/Protocols/WebSockets/IControlFrameHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Bedrock.Framework.Protocols.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// Handles WebSocket control frames encountered by a WebSocketMessageReader.
/// </summary>
public interface IControlFrameHandler
{
/// <summary>
/// Handles a WebSocket control frame.
/// </summary>
/// <param name="controlFrame">The control frame to handle.</param>
/// <param name="cancellationToken">A cancellation token, if any.</param>
ValueTask HandleControlFrameAsync(WebSocketControlFrame controlFrame, CancellationToken cancellationToken = default);
}
}
48 changes: 48 additions & 0 deletions src/Bedrock.Framework/Protocols/WebSockets/MessageReadResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Text;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// A result from reading from a WebSocket message.
/// </summary>
public readonly struct MessageReadResult
{
/// <summary>
/// True if the data is the final data in the message, false otherwise.
/// </summary>
public bool IsEndOfMessage { get; }

/// <summary>
/// True if the underlying transport read was canceled, false otherwise.
/// </summary>
public bool IsCanceled { get; }

/// <summary>
/// True if the underlying transport is completed, false otherwise.
/// </summary>
public bool IsCompleted { get; }

/// <summary>
/// The data read from the WebSocket.
/// </summary>
public ReadOnlySequence<byte> Data { get; }

/// <summary>
/// Creates an instance of a MessageReadResult.
/// </summary>
/// <param name="data">The data read from the WebSocket.</param>
/// <param name="isEndOfMessage">True if the data is the final data in the message, false otherwise.</param>
/// <param name="isCanceled">True if the underlying transport read was canceled, false otherwise.</param>
/// <param name="isCompleted">True if the underlying transport is completed, false otherwise.</param>
public MessageReadResult(ReadOnlySequence<byte> data, bool isEndOfMessage, bool isCanceled, bool isCompleted)
{
Data = data;
IsEndOfMessage = isEndOfMessage;
IsCanceled = isCanceled;
IsCompleted = isCompleted;
}
}
}
65 changes: 65 additions & 0 deletions src/Bedrock.Framework/Protocols/WebSockets/WebSocketCloseStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// The status of the closing of a WebSocket.
/// </summary>
public enum WebSocketCloseStatus : short
{
/// <summary>
/// A normal closure, for which the purpose for the connection has been fulfilled.
/// </summary>
Normal = 1000,

/// <summary>
/// Indicates that an endpoint is going away, such as a server going down or a
/// browser navigating away.
/// </summary>
GoingAway = 1001,

/// <summary>
/// Indicates that an endpoint is terminating the connection due to a protocol error.
/// </summary>
ProtocolError = 1002,

/// <summary>
/// Indicates that an endpoint is terminating the connection because it has received
/// a type of data it cannot accept.
/// </summary>
UnacceptableData = 1003,

/// <summary>
/// Indicates that an endpoint is terminating the connection because it has received
/// data within a message that was not consistent with the type of the message.
/// </summary>
IncorrectDataType = 1007,

/// <summary>
/// Indicates that an endpoint is terminating the connection because it has received
/// a message that violates its policy.
/// </summary>
PolicyViolation = 1008,

/// <summary>
/// Indicates that an endpoint is terminating the connection because it has received
/// a message that is too big for it to process.
/// </summary>
MessageTooLarge = 1009,

/// <summary>
/// Indicates that an endpoint (client) is terminating the connection because it has
/// expected the server to negotiate one or more extensions, but the server didn't
/// return them in the response message of the WebSocket handshake.
/// </summary>
ExpectedExtensionNotFound = 1010,

/// <summary>
/// Indicates that a server is terminating the connection because it encountered
/// an unexpected condition that prevented it from fulfilling the request.
/// </summary>
UnexpectedError = 1011
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Bedrock.Framework.Protocols.WebSockets;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Text;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// A WebSocket control frame.
/// </summary>
public class WebSocketControlFrame
{
/// <summary>
/// The opcode of the control frame.
/// </summary>
public WebSocketOpcode Opcode { get; }

/// <summary>
/// The close status, if provided and if the frame is a close frame.
/// </summary>
public WebSocketCloseStatus CloseStatus { get; }

/// <summary>
/// The payload of the control frame.
/// </summary>
public ReadOnlySequence<byte> Payload { get; }

/// <summary>
/// Creates an instance of a WebSocketControlFrame.
/// </summary>
/// <param name="opcode">The opcode of the control frame.</param>
/// <param name="closeStatus">The close status, if provided and if the frame is a close frame.</param>
/// <param name="payload">The payload of the control frame.</param>
public WebSocketControlFrame(WebSocketOpcode opcode, WebSocketCloseStatus closeStatus = default, ReadOnlySequence<byte> payload = default)
{
Opcode = opcode;
CloseStatus = closeStatus;
Payload = payload;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Bedrock.Framework.Protocols;
using Bedrock.Framework.Protocols.WebSockets;
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// Reads a WebSocket control frame payload.
/// </summary>
public readonly struct WebSocketControlFrameReader : IMessageReader<WebSocketControlFrame>
{
/// <summary>
/// The header of the WebSocket control frame.
/// </summary>
private readonly WebSocketHeader _header;

/// <summary>
/// A payload reader instance for the control frame.
/// </summary>
private readonly WebSocketPayloadReader _payloadReader;

/// <summary>
/// Creates an instance of a WebSocketControlFrameReader.
/// </summary>
/// <param name="header">The header of the WebSocket control frame.</param>
public WebSocketControlFrameReader(WebSocketHeader header)
{
_header = header;
_payloadReader = new WebSocketPayloadReader(header);
}

/// <summary>
/// Attempts to parse a WebSocket control frame payload from a sequence.
/// </summary>
/// <param name="input">The input sequence to parse from.</param>
/// <param name="consumed">The position in the sequence that has been consumed.</param>
/// <param name="examined">The position in the sequence that has been examined.</param>
/// <param name="message">The returned WebSocket control frame message.</param>
/// <returns>True if the message could be parsed, false otherwise.</returns>
public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePosition consumed, ref SequencePosition examined, out WebSocketControlFrame message)
{
if ((ulong)input.Length < _header.PayloadLength)
{
message = default;
return false;
}

_payloadReader.TryParseMessage(input, ref consumed, ref examined, out var output);
Debug.Assert(_payloadReader.BytesRemaining == 0);

if (_header.Opcode == WebSocketOpcode.Close && output.Length >= 2)
{
var closeStatus = (WebSocketCloseStatus)BinaryPrimitives.ReadInt16BigEndian(output.FirstSpan);
message = new WebSocketControlFrame(_header.Opcode, closeStatus, output.Slice(2));
}
else
{
message = new WebSocketControlFrame(_header.Opcode, default, output);
}

return true;
}
}
}
89 changes: 70 additions & 19 deletions src/Bedrock.Framework/Protocols/WebSockets/WebSocketFrameReader.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
using Bedrock.Framework.Protocols;
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// An implementation of IMessageReader that parses WebSocket message frames.
/// </summary>
public struct WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
public class WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
{
/// <summary>
/// An instance of the WebSocketFrameReader.
/// </summary>
private WebSocketPayloadReader _payloadReader;

/// <summary>
/// Attempts to parse a message from a sequence.
/// </summary>
Expand All @@ -21,17 +28,59 @@ public struct WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
/// <returns>True if parsed successfully, false otherwise.</returns>
public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePosition consumed, ref SequencePosition examined, out WebSocketReadFrame message)
{
var reader = new SequenceReader<byte>(input);

//We need to at least be able to read the start of frame header
if (input.Length < 2)
{
message = default;
return false;
}

reader.TryRead(out var finOpcodeByte);
reader.TryRead(out var maskLengthByte);
if (input.IsSingleSegment || input.FirstSpan.Length >= 14)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to encapsulate this logic in a helper (it's impossible with stackalloc though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following you, what's a helper in this context and why do we need to?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A method in the framework that can get you a span of bytes given a minimum length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I gotcha. Yeah, it's hard to picture a way of implementing that without having to either go to the pool or allocate a byte array, because you can't leak the stackalloc'd span.

{
if (TryParseSpan(input.FirstSpan, input.Length, out var bytesRead, out message))
{
consumed = input.GetPosition(bytesRead);
examined = consumed;

return true;
}

return false;

}
else
{
Span<byte> tempSpan = stackalloc byte[14];

var bytesToCopy = Math.Min(input.Length, tempSpan.Length);
input.Slice(0, bytesToCopy).CopyTo(tempSpan);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this fail with ArgumentException that bytesToCopy is outside the range if input has length of less than 14 bytes?
I think there should be a check somewhere right?


if (TryParseSpan(tempSpan, input.Length, out var bytesRead, out message))
{
consumed = input.GetPosition(bytesRead);
examined = consumed;

return true;
}

return false;
}
}

/// <summary>
/// Attempts to parse a span for a WebSocket frame header.
/// </summary>
/// <param name="span">The span to attempt to parse.</param>
/// <param name="inputLength">The input sequence length.</param>
/// <param name="bytesRead">The number of bytes read from the span.</param>
/// <param name="message">The WebSocketReadFrame read from the span.</param>
/// <returns>True if the span could be parsed, false otherwise.</returns>
private bool TryParseSpan(in ReadOnlySpan<byte> span, long inputLength, out int bytesRead, out WebSocketReadFrame message)
{
bytesRead = 0;

var finOpcodeByte = span[0];
var maskLengthByte = span[1];

var masked = (maskLengthByte & 0b1000_0000) != 0;
ulong initialPayloadLength = (ulong)(maskLengthByte & 0b0111_1111);
Expand All @@ -49,25 +98,23 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
break;
}

if (reader.Remaining < extendedPayloadLengthSize + maskSize)
if (inputLength < extendedPayloadLengthSize + maskSize + 2)
{
message = default;
return false;
}

var fin = (finOpcodeByte & 0b1000_0000) != 0;
var opcode = (WebSocketOpcode)(finOpcodeByte & 0b0000_1111);

ulong payloadLength = 0;
if (extendedPayloadLengthSize == 2)
{
reader.TryReadBigEndian(out short length);
payloadLength = (ushort)length;
payloadLength = BinaryPrimitives.ReadUInt16BigEndian(span.Slice(2));
}
else if (extendedPayloadLengthSize == 8)
{
reader.TryReadBigEndian(out long length);
payloadLength = (ulong)length;
payloadLength = BinaryPrimitives.ReadUInt64BigEndian(span.Slice(2));
}
else
{
Expand All @@ -77,18 +124,22 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
int maskingKey = 0;
if (masked)
{
Span<byte> maskBytes = stackalloc byte[sizeof(int)];
reader.TryCopyTo(maskBytes);

maskingKey = BitConverter.ToInt32(maskBytes);
reader.Advance(sizeof(int));
maskingKey = BinaryPrimitives.ReadInt32LittleEndian(span.Slice(2 + extendedPayloadLengthSize));
}

var header = new WebSocketHeader(fin, opcode, masked, payloadLength, maskingKey);
message = new WebSocketReadFrame(header, new WebSocketPayloadReader(header));

consumed = input.GetPosition(2 + extendedPayloadLengthSize + maskSize);
examined = consumed;
if(_payloadReader == null)
{
_payloadReader = new WebSocketPayloadReader(header);
}
else
{
_payloadReader.Reset(header);
}

message = new WebSocketReadFrame(header, _payloadReader);
bytesRead = 2 + extendedPayloadLengthSize + maskSize;
return true;
}
}
Expand Down
Loading