Skip to content

Commit

Permalink
Merge pull request #1665 from pkuehnel/feat/improveWebSocketConnection
Browse files Browse the repository at this point in the history
feat(FleetTelemetryWebSocketService): detect server heartbeat and use installation id on connection
  • Loading branch information
pkuehnel authored Nov 29, 2024
2 parents de3c778 + 7666ea6 commit 4536421
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public class DtoFleetTelemetryWebSocketClients
{
public string Vin { get; set; }
public ClientWebSocket WebSocketClient { get; set; }
public DateTime LastReceivedHeartbeat { get; set; }
public CancellationToken CancellationToken { get; set; }
}
29 changes: 19 additions & 10 deletions TeslaSolarCharger/Server/Services/FleetTelemetryWebSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public class FleetTelemetryWebSocketService(
IConfigurationWrapper configurationWrapper,
IDateTimeProvider dateTimeProvider,
IServiceProvider serviceProvider,
ISettings settings) : IFleetTelemetryWebSocketService
ISettings settings,
ITscConfigurationService tscConfigurationService) : IFleetTelemetryWebSocketService
{
private readonly TimeSpan _heartbeatsendTimeout = TimeSpan.FromSeconds(5);

Expand Down Expand Up @@ -52,7 +53,13 @@ public async Task ReconnectWebSocketsForEnabledCars()
var existingClient = Clients.FirstOrDefault(c => c.Vin == car.Vin);
if (existingClient != default)
{
if (existingClient.WebSocketClient.State == WebSocketState.Open)
var currentTime = dateTimeProvider.UtcNow();
//When intervall is changed, change it also in the server WebSocketConnectionHandlingService.SendHeartbeatsTask
var serverSideHeartbeatIntervall = TimeSpan.FromSeconds(54);
var additionalIntervallbuffer = TimeSpan.FromSeconds(30);
var maxLastHeartbeatAge = serverSideHeartbeatIntervall + additionalIntervallbuffer;
var earliestPossibleLastHeartbeat = currentTime - maxLastHeartbeatAge;
if ((existingClient.WebSocketClient.State == WebSocketState.Open) && (existingClient.LastReceivedHeartbeat > earliestPossibleLastHeartbeat))
{
var segment = new ArraySegment<byte>(bytesToSend);
try
Expand All @@ -72,7 +79,7 @@ await existingClient.WebSocketClient.SendAsync(segment, WebSocketMessageType.Tex
continue;
}

logger.LogInformation("Websocket Client for car {vin} is not open. Disposing client", car.Vin);
logger.LogInformation("Websocket Client for car {vin} is not open or last heartbeat is too old. Disposing client", car.Vin);
existingClient.WebSocketClient.Dispose();
Clients.Remove(existingClient);
}
Expand Down Expand Up @@ -114,9 +121,9 @@ private async Task ConnectToFleetTelemetryApi(string vin, bool useFleetTelemetry
logger.LogError("Can not connect to WebSocket: No token found for car {vin}", vin);
return;
}

var installationId = await tscConfigurationService.GetInstallationId().ConfigureAwait(false);
var url = configurationWrapper.FleetTelemetryApiUrl() +
$"teslaToken={token.AccessToken}&region={token.Region}&vin={vin}&forceReconfiguration=false&includeLocation={useFleetTelemetryForLocationData}";
$"teslaToken={token.AccessToken}&region={token.Region}&vin={vin}&forceReconfiguration=false&includeLocation={useFleetTelemetryForLocationData}&installationId={installationId}";
using var client = new ClientWebSocket();
try
{
Expand All @@ -127,6 +134,7 @@ private async Task ConnectToFleetTelemetryApi(string vin, bool useFleetTelemetry
Vin = vin,
WebSocketClient = client,
CancellationToken = cancellation.Token,
LastReceivedHeartbeat = currentDate,
};
Clients.Add(dtoClient);
var carId = await context.Cars
Expand All @@ -135,7 +143,7 @@ private async Task ConnectToFleetTelemetryApi(string vin, bool useFleetTelemetry
.FirstOrDefaultAsync().ConfigureAwait(false);
try
{
await ReceiveMessages(client, dtoClient.CancellationToken, dtoClient.Vin, carId).ConfigureAwait(false);
await ReceiveMessages(dtoClient, dtoClient.Vin, carId).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -157,22 +165,22 @@ await client.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing",
}
}

private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken ctx, string vin, int carId)
private async Task ReceiveMessages(DtoFleetTelemetryWebSocketClients client, string vin, int carId)
{
logger.LogTrace("{method}(webSocket, ctx, {vin}, {carId})", nameof(ReceiveMessages), vin, carId);
var buffer = new byte[1024 * 4]; // Buffer to store incoming data
while (webSocket.State == WebSocketState.Open)
while (client.WebSocketClient.State == WebSocketState.Open)
{
try
{
// Receive message from the WebSocket server
logger.LogTrace("Waiting for new fleet telemetry message for car {vin}", vin);
var result = await webSocket.ReceiveAsync(new(buffer), ctx);
var result = await client.WebSocketClient.ReceiveAsync(new(buffer), client.CancellationToken);
logger.LogTrace("Received new fleet telemetry message for car {vin}", vin);
if (result.MessageType == WebSocketMessageType.Close)
{
// If the server closed the connection, close the WebSocket
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, result.CloseStatusDescription, ctx);
await client.WebSocketClient.CloseAsync(WebSocketCloseStatus.NormalClosure, result.CloseStatusDescription, client.CancellationToken);
logger.LogInformation("WebSocket connection closed by server.");
}
else
Expand All @@ -182,6 +190,7 @@ private async Task ReceiveMessages(ClientWebSocket webSocket, CancellationToken
if (jsonMessage == "Heartbeat")
{
logger.LogTrace("Received heartbeat: {message}", jsonMessage);
client.LastReceivedHeartbeat = dateTimeProvider.UtcNow();
continue;
}

Expand Down

0 comments on commit 4536421

Please sign in to comment.