From 0a92d517617c61ac6ab2cebba4345129ae34da35 Mon Sep 17 00:00:00 2001 From: Thorsten Riess Date: Sat, 20 Feb 2021 14:46:38 +0100 Subject: [PATCH 1/2] Fix potential write to closed channel --- main.go | 14 ++++++++++++-- ws/client.go | 21 +++++++++------------ ws/hub.go | 39 ++++++++++++++++++++++++++------------- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/main.go b/main.go index a750ad9..7578dc7 100644 --- a/main.go +++ b/main.go @@ -205,8 +205,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) { go c.WriteLoop() + c.Add(1) // Add to the hub hub.Register <- c + c.Wait() // wait until registered. // Trickle ICE. Emit server candidate to client peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { @@ -224,7 +226,11 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) { Event: ws.MessageTypeCandidate, Data: string(candidateString), }); err == nil { - c.Send <- msg + hub.RLock() + if _, ok := hub.Clients[c]; ok { + c.Send <- msg + } + hub.RUnlock() } else { log.Println(err) } @@ -262,7 +268,11 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) { Event: ws.MessageTypeOffer, Data: string(offerString), }); err == nil { - c.Send <- msg + hub.RLock() + if _, ok := hub.Clients[c]; ok { + c.Send <- msg + } + hub.RUnlock() } else { log.Printf("could not marshal ws message: %s", err) } diff --git a/ws/client.go b/ws/client.go index daa86a9..1fabb8e 100644 --- a/ws/client.go +++ b/ws/client.go @@ -3,6 +3,7 @@ package ws import ( "encoding/json" "log" + "sync" "time" "github.com/gorilla/websocket" @@ -21,6 +22,9 @@ type Client struct { // webRTC peer connection PeerConnection *webrtc.PeerConnection + + // Wait group + sync.WaitGroup } func NewClient(hub *Hub, conn *websocket.Conn, webrtcConn *webrtc.PeerConnection) *Client { @@ -88,10 +92,6 @@ func (c *Client) ReadLoop() { return } } - - // we do not send anything to the other clients! - //message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) - //c.hub.Broadcast <- message } } @@ -120,14 +120,11 @@ func (c *Client) WriteLoop() { if err != nil { return } - _, _ = w.Write(message) - - // Add queued messages to the current websocket message. - n := len(c.Send) - for i := 0; i < n; i++ { - _, _ = w.Write([]byte{'\n'}) - message = <-c.Send - _, _ = w.Write(message) + _, err = w.Write(message) + if err != nil { + log.Printf("could not send message: %s",err) + w.Close() + return } if err := w.Close(); err != nil { diff --git a/ws/hub.go b/ws/hub.go index a887580..c8ef083 100644 --- a/ws/hub.go +++ b/ws/hub.go @@ -3,6 +3,7 @@ package ws import ( "encoding/json" "log" + "sync" "time" ) @@ -18,10 +19,10 @@ type Info struct { } type Hub struct { - // Registered clients. - clients map[*Client]struct{} + // Registered Clients. + Clients map[*Client]struct{} - // Broadcast messages to all clients. + // Broadcast messages to all Clients. Broadcast chan []byte // Register a new client to the hub. @@ -29,20 +30,25 @@ type Hub struct { // Unregister a client from the hub. Unregister chan *Client + + // lock to prevent write to closed channel + sync.RWMutex } func NewHub() *Hub { return &Hub{ - clients: make(map[*Client]struct{}), + Clients: make(map[*Client]struct{}), Broadcast: make(chan []byte), Register: make(chan *Client), Unregister: make(chan *Client), } } -// NoClients returns the number of clients registered +// NoClients returns the number of Clients registered func (h *Hub) NoClients() int { - return len(h.clients) + h.RLock() + defer h.RUnlock() + return len(h.Clients) } // Run is the main hub event loop handling register, unregister and broadcast events. @@ -50,17 +56,24 @@ func (h *Hub) Run() { for { select { case client := <-h.Register: - h.clients[client] = struct{}{} + h.Lock() + h.Clients[client] = struct{}{} + h.Unlock() + client.Done() case client := <-h.Unregister: - if _, ok := h.clients[client]; ok { - delete(h.clients, client) + h.Lock() + if _, ok := h.Clients[client]; ok { + delete(h.Clients, client) close(client.Send) - go h.SendInfo(h.GetInfo()) // this way the number of clients does not change between calling the goroutine and executing it + go h.SendInfo(h.GetInfo()) // this way the number of Clients does not change between calling the goroutine and executing it } + h.Unlock() case message := <-h.Broadcast: - for client := range h.clients { + h.RLock() + for client := range h.Clients { client.Send <- message } + h.RUnlock() } } } @@ -71,7 +84,7 @@ func (h *Hub) GetInfo() Info { } } -// SendInfo broadcasts hub statistics to all clients. +// SendInfo broadcasts hub statistics to all Clients. func (h *Hub) SendInfo(info Info) { i, err := json.Marshal(info) if err != nil { @@ -85,4 +98,4 @@ func (h *Hub) SendInfo(info Info) { } else { log.Printf("could not marshal ws message: %s", err) } -} \ No newline at end of file +} From 9a7a5ceaca1472e32f008b87ae67b5d4a625c4a8 Mon Sep 17 00:00:00 2001 From: Thorsten Riess Date: Sat, 20 Feb 2021 15:15:39 +0100 Subject: [PATCH 2/2] Fix deadlock on reload --- main.go | 4 ---- ws/client.go | 4 ---- ws/hub.go | 15 ++++++++++----- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index 7578dc7..0d29f00 100644 --- a/main.go +++ b/main.go @@ -205,10 +205,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) { go c.WriteLoop() - c.Add(1) // Add to the hub hub.Register <- c - c.Wait() // wait until registered. // Trickle ICE. Emit server candidate to client peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { @@ -277,7 +275,5 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) { log.Printf("could not marshal ws message: %s", err) } - go hub.SendInfo(hub.GetInfo()) // non-blocking broadcast, required as the read loop is not started yet. - c.ReadLoop() } diff --git a/ws/client.go b/ws/client.go index 1fabb8e..f241bbd 100644 --- a/ws/client.go +++ b/ws/client.go @@ -3,7 +3,6 @@ package ws import ( "encoding/json" "log" - "sync" "time" "github.com/gorilla/websocket" @@ -22,9 +21,6 @@ type Client struct { // webRTC peer connection PeerConnection *webrtc.PeerConnection - - // Wait group - sync.WaitGroup } func NewClient(hub *Hub, conn *websocket.Conn, webrtcConn *webrtc.PeerConnection) *Client { diff --git a/ws/hub.go b/ws/hub.go index c8ef083..b6df381 100644 --- a/ws/hub.go +++ b/ws/hub.go @@ -39,8 +39,8 @@ func NewHub() *Hub { return &Hub{ Clients: make(map[*Client]struct{}), Broadcast: make(chan []byte), - Register: make(chan *Client), - Unregister: make(chan *Client), + Register: make(chan *Client, 1), + Unregister: make(chan *Client, 1), } } @@ -59,15 +59,20 @@ func (h *Hub) Run() { h.Lock() h.Clients[client] = struct{}{} h.Unlock() - client.Done() + go h.SendInfo(h.GetInfo()) case client := <-h.Unregister: - h.Lock() + h.RLock() if _, ok := h.Clients[client]; ok { + h.RUnlock() + h.Lock() delete(h.Clients, client) + h.Unlock() + client.conn.Close() close(client.Send) go h.SendInfo(h.GetInfo()) // this way the number of Clients does not change between calling the goroutine and executing it + } else { + h.RUnlock() } - h.Unlock() case message := <-h.Broadcast: h.RLock() for client := range h.Clients {