Skip to content

Commit

Permalink
Merge pull request #2731 from headlamp-k8s/fix-websocket-race-condition
Browse files Browse the repository at this point in the history
backend: prevent panic in WebSocket multiplexer
  • Loading branch information
illume authored Jan 6, 2025
2 parents 447bdbd + 9c0d977 commit 4558def
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 14 deletions.
17 changes: 16 additions & 1 deletion backend/cmd/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ func (m *Multiplexer) sendIfNewResourceVersion(

// sendCompleteMessage sends a COMPLETE message to the client.
func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocket.Conn) error {
conn.mu.RLock()
if conn.closed {
conn.mu.RUnlock()
return nil // Connection is already closed, no need to send message
}

conn.mu.RUnlock()

completeMsg := Message{
ClusterID: conn.ClusterID,
Path: conn.Path,
Expand All @@ -593,7 +601,14 @@ func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocke
conn.writeMu.Lock()
defer conn.writeMu.Unlock()

return clientConn.WriteJSON(completeMsg)
err := clientConn.WriteJSON(completeMsg)
if err != nil {
logger.Log(logger.LevelInfo, nil, err, "connection closed while writing complete message")

return nil // Just return nil for any error - connection is dead anyway
}

return nil
}

// sendDataMessage sends the actual data message to the client.
Expand Down
62 changes: 61 additions & 1 deletion backend/cmd/multiplexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,67 @@ func TestSendCompleteMessage_ClosedConnection(t *testing.T) {
// Test with closed connection
clientConn.Close()
err = m.sendCompleteMessage(conn, clientConn)
assert.Error(t, err)
assert.NoError(t, err)
}

func TestSendCompleteMessage_ErrorConditions(t *testing.T) {
tests := []struct {
name string
setupConn func(*Connection, *websocket.Conn)
expectedError bool
}{
{
name: "connection already marked as closed",
setupConn: func(conn *Connection, _ *websocket.Conn) {
conn.closed = true
},
expectedError: false,
},
{
name: "normal closure",
setupConn: func(_ *Connection, clientConn *websocket.Conn) {
//nolint:errcheck
clientConn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
clientConn.Close()
},
expectedError: false,
},
{
name: "unexpected close error",
setupConn: func(_ *Connection, clientConn *websocket.Conn) {
//nolint:errcheck
clientConn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseProtocolError, ""))
clientConn.Close()
},
expectedError: false, // All errors return nil now
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewMultiplexer(kubeconfig.NewContextStore())
clientConn, clientServer := createTestWebSocketConnection()
defer clientServer.Close()

conn := &Connection{
ClusterID: "test-cluster",
Path: "/api/v1/pods",
UserID: "test-user",
Query: "watch=true",
}

tt.setupConn(conn, clientConn)
err := m.sendCompleteMessage(conn, clientConn)

if tt.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func createMockKubeAPIServer() *httptest.Server {
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/lib/k8s/api/v2/KubeList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ export const KubeList = {
update: KubeListUpdateEvent<ObjectInterface>,
itemClass: ObjectClass
): KubeList<KubeObject<ObjectInterface>> {
// Skip if the update's resource version is older than or equal to what we have
if (
list.metadata.resourceVersion &&
update.object.metadata.resourceVersion &&
parseInt(update.object.metadata.resourceVersion) <= parseInt(list.metadata.resourceVersion)
) {
return list;
}

const newItems = [...list.items];
const index = newItems.findIndex(item => item.metadata.uid === update.object.metadata.uid);

Expand Down
8 changes: 2 additions & 6 deletions frontend/src/lib/k8s/api/v2/useKubeObjectList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,8 @@ function useWatchKubeObjectListsMultiplexed<K extends KubeObject>({
return lists.map(list => {
const key = `${list.cluster}:${list.namespace || ''}`;

// Update resource version if newer one is available
const currentVersion = latestResourceVersions.current[key];
const newVersion = list.resourceVersion;
if (!currentVersion || parseInt(newVersion) > parseInt(currentVersion)) {
latestResourceVersions.current[key] = newVersion;
}
// Always use the latest resource version from the server
latestResourceVersions.current[key] = list.resourceVersion;

// Construct WebSocket URL with current parameters
return {
Expand Down
6 changes: 0 additions & 6 deletions frontend/src/lib/k8s/api/v2/webSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,6 @@ export const WebSocketManager = {
// Handle COMPLETE messages
if (data.type === 'COMPLETE') {
this.completedPaths.add(key);
return;
}

// Skip if path is already completed
if (this.completedPaths.has(key)) {
return;
}

// Parse and validate update data
Expand Down

0 comments on commit 4558def

Please sign in to comment.