Skip to content

Commit d290009

Browse files
authored
Merge pull request kubernetes-sigs#2741 from headlamp-k8s/web-lock-fix
backend: enhance WSConnLock with thread-safety
2 parents a0806b2 + 19cd3e6 commit d290009

File tree

2 files changed

+473
-290
lines changed

2 files changed

+473
-290
lines changed

backend/cmd/multiplexer.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type Connection struct {
6363
// Status is the status of the connection.
6464
Status ConnectionStatus
6565
// Client is the WebSocket connection to the client.
66-
Client *websocket.Conn
66+
Client *WSConnLock
6767
// Done is a channel to signal when the connection is done.
6868
Done chan struct{}
6969
// mu is a mutex to synchronize access to the connection.
@@ -104,6 +104,68 @@ type Multiplexer struct {
104104
kubeConfigStore kubeconfig.ContextStore
105105
}
106106

107+
// WSConnLock provides a thread-safe wrapper around a WebSocket connection.
108+
// It ensures that write operations are synchronized using a mutex to prevent
109+
// concurrent writes which could corrupt the WebSocket stream.
110+
type WSConnLock struct {
111+
// conn is the underlying WebSocket connection
112+
conn *websocket.Conn
113+
// writeMu is a mutex to synchronize access to write operations.
114+
// This prevents concurrent writes to the WebSocket connection.
115+
writeMu sync.Mutex
116+
}
117+
118+
// NewWSConnLock creates a new WSConnLock instance that wraps the provided
119+
// WebSocket connection with thread-safe write operations.
120+
func NewWSConnLock(conn *websocket.Conn) *WSConnLock {
121+
return &WSConnLock{
122+
conn: conn,
123+
writeMu: sync.Mutex{},
124+
}
125+
}
126+
127+
// WriteJSON writes the JSON encoding of v as a message to the WebSocket connection.
128+
// It ensures thread-safety by using a mutex lock during the write operation.
129+
func (conn *WSConnLock) WriteJSON(v interface{}) error {
130+
conn.writeMu.Lock()
131+
defer conn.writeMu.Unlock()
132+
133+
return conn.conn.WriteJSON(v)
134+
}
135+
136+
// ReadJSON reads the next JSON-encoded message from the WebSocket connection
137+
// and stores it in the value pointed to by v.
138+
// Note: Reading is already thread-safe in gorilla/websocket, so no mutex is needed.
139+
func (conn *WSConnLock) ReadJSON(v interface{}) error {
140+
return conn.conn.ReadJSON(v)
141+
}
142+
143+
// ReadMessage reads the next message from the WebSocket connection.
144+
// It returns the message type and payload.
145+
// Note: Reading is already thread-safe in gorilla/websocket, so no mutex is needed.
146+
func (conn *WSConnLock) ReadMessage() (messageType int, p []byte, err error) {
147+
return conn.conn.ReadMessage()
148+
}
149+
150+
// WriteMessage writes a message to the WebSocket connection with the given type and payload.
151+
// It ensures thread-safety by using a mutex lock during the write operation.
152+
func (conn *WSConnLock) WriteMessage(messageType int, data []byte) error {
153+
conn.writeMu.Lock()
154+
defer conn.writeMu.Unlock()
155+
156+
return conn.conn.WriteMessage(messageType, data)
157+
}
158+
159+
// Close safely closes the WebSocket connection.
160+
// It ensures thread-safety by acquiring the write mutex before closing,
161+
// preventing any concurrent writes during the close operation.
162+
func (conn *WSConnLock) Close() error {
163+
conn.writeMu.Lock()
164+
defer conn.writeMu.Unlock()
165+
166+
return conn.conn.Close()
167+
}
168+
107169
// NewMultiplexer creates a new Multiplexer instance.
108170
func NewMultiplexer(kubeConfigStore kubeconfig.ContextStore) *Multiplexer {
109171
return &Multiplexer{
@@ -183,7 +245,7 @@ func (m *Multiplexer) establishClusterConnection(
183245
userID,
184246
path,
185247
query string,
186-
clientConn *websocket.Conn,
248+
clientConn *WSConnLock,
187249
) (*Connection, error) {
188250
config, err := m.getClusterConfigWithFallback(clusterID, userID)
189251
if err != nil {
@@ -246,7 +308,7 @@ func (m *Multiplexer) createConnection(
246308
userID,
247309
path,
248310
query string,
249-
clientConn *websocket.Conn,
311+
clientConn *WSConnLock,
250312
) *Connection {
251313
return &Connection{
252314
ClusterID: clusterID,
@@ -355,6 +417,8 @@ func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Reque
355417

356418
defer clientConn.Close()
357419

420+
lockClientConn := NewWSConnLock(clientConn)
421+
358422
for {
359423
msg, err := m.readClientMessage(clientConn)
360424
if err != nil {
@@ -368,9 +432,9 @@ func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Reque
368432
continue
369433
}
370434

371-
conn, err := m.getOrCreateConnection(msg, clientConn)
435+
conn, err := m.getOrCreateConnection(msg, lockClientConn)
372436
if err != nil {
373-
m.handleConnectionError(clientConn, msg, err)
437+
m.handleConnectionError(lockClientConn, msg, err)
374438

375439
continue
376440
}
@@ -408,7 +472,7 @@ func (m *Multiplexer) readClientMessage(clientConn *websocket.Conn) (Message, er
408472
}
409473

410474
// getOrCreateConnection gets an existing connection or creates a new one if it doesn't exist.
411-
func (m *Multiplexer) getOrCreateConnection(msg Message, clientConn *websocket.Conn) (*Connection, error) {
475+
func (m *Multiplexer) getOrCreateConnection(msg Message, clientConn *WSConnLock) (*Connection, error) {
412476
connKey := m.createConnectionKey(msg.ClusterID, msg.Path, msg.UserID)
413477

414478
m.mutex.RLock()
@@ -437,7 +501,7 @@ func (m *Multiplexer) getOrCreateConnection(msg Message, clientConn *websocket.C
437501
}
438502

439503
// handleConnectionError handles errors that occur when establishing a connection.
440-
func (m *Multiplexer) handleConnectionError(clientConn *websocket.Conn, msg Message, err error) {
504+
func (m *Multiplexer) handleConnectionError(clientConn *WSConnLock, msg Message, err error) {
441505
errorMsg := struct {
442506
ClusterID string `json:"clusterId"`
443507
Error string `json:"error"`
@@ -477,7 +541,7 @@ func (m *Multiplexer) writeMessageToCluster(conn *Connection, data []byte) error
477541
}
478542

479543
// handleClusterMessages handles messages from a cluster connection.
480-
func (m *Multiplexer) handleClusterMessages(conn *Connection, clientConn *websocket.Conn) {
544+
func (m *Multiplexer) handleClusterMessages(conn *Connection, clientConn *WSConnLock) {
481545
defer m.cleanupConnection(conn)
482546

483547
var lastResourceVersion string
@@ -497,7 +561,7 @@ func (m *Multiplexer) handleClusterMessages(conn *Connection, clientConn *websoc
497561
// processClusterMessage processes a single message from the cluster.
498562
func (m *Multiplexer) processClusterMessage(
499563
conn *Connection,
500-
clientConn *websocket.Conn,
564+
clientConn *WSConnLock,
501565
lastResourceVersion *string,
502566
) error {
503567
messageType, message, err := conn.WSConn.ReadMessage()
@@ -541,7 +605,7 @@ func (m *Multiplexer) processClusterMessage(
541605
func (m *Multiplexer) sendIfNewResourceVersion(
542606
message []byte,
543607
conn *Connection,
544-
clientConn *websocket.Conn,
608+
clientConn *WSConnLock,
545609
lastResourceVersion *string,
546610
) error {
547611
var obj map[string]interface{}
@@ -581,7 +645,7 @@ func (m *Multiplexer) sendIfNewResourceVersion(
581645
}
582646

583647
// sendCompleteMessage sends a COMPLETE message to the client.
584-
func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocket.Conn) error {
648+
func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *WSConnLock) error {
585649
conn.mu.RLock()
586650
if conn.closed {
587651
conn.mu.RUnlock()
@@ -614,7 +678,7 @@ func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocke
614678
// sendDataMessage sends the actual data message to the client.
615679
func (m *Multiplexer) sendDataMessage(
616680
conn *Connection,
617-
clientConn *websocket.Conn,
681+
clientConn *WSConnLock,
618682
messageType int,
619683
message []byte,
620684
) error {

0 commit comments

Comments
 (0)