@@ -41,7 +41,7 @@ func newWebsocketPump(
4141 frontend : frontend ,
4242 backend : backend ,
4343 logger : logger ,
44- done : make (chan struct {}),
44+ done : make (chan struct {}, 1 ),
4545 }
4646}
4747
@@ -93,10 +93,19 @@ func (p *websocketPump) run() error {
9393}
9494
9595func (p * websocketPump ) stop () error {
96- p .done <- struct {}{}
96+ if ! p .active .Load () {
97+ return nil // already stopped
98+ }
9799
98100 errs := make ([]error , 0 )
99101
102+ select {
103+ case p .done <- struct {}{}:
104+ // no-op
105+ default :
106+ errs = append (errs , errors .New ("double-closing the pump" ))
107+ }
108+
100109 countdown := 60
101110 for p .active .Load () && countdown > 0 {
102111 time .Sleep (time .Second )
@@ -130,8 +139,19 @@ func (p *websocketPump) pumpMessages(
130139 )
131140
132141 messages := make (chan * websocketMessage , 16 )
133- doneReads := make (chan struct {})
134- doneWrites := make (chan struct {})
142+ doneReads := make (chan struct {}, 1 )
143+ doneWrites := make (chan struct {}, 1 )
144+
145+ notifyOnFailure := func (err error ) {
146+ select {
147+ case failure <- err :
148+ // no-op
149+ default :
150+ l .Warn ("Dropping websocket pump failure b/c the channel is full" ,
151+ zap .Error (err ),
152+ )
153+ }
154+ }
135155
136156 go func () { // read
137157 for {
@@ -141,13 +161,13 @@ func (p *websocketPump) pumpMessages(
141161
142162 default :
143163 if err := from .SetReadDeadline (utils .Deadline (timeout )); err != nil {
144- failure <- err
164+ notifyOnFailure ( err )
145165 continue
146166 }
147167
148168 msgType , bytes , err := from .ReadMessage ()
149169 if err != nil {
150- failure <- err
170+ notifyOnFailure ( err )
151171 continue
152172 }
153173
@@ -213,12 +233,12 @@ func (p *websocketPump) pumpMessages(
213233 }
214234
215235 if err := to .SetWriteDeadline (utils .Deadline (timeout )); err != nil {
216- failure <- err
236+ notifyOnFailure ( err )
217237 continue
218238 }
219239
220240 if err := to .WriteMessage (m .msgType , m .bytes ); err != nil {
221- failure <- err
241+ notifyOnFailure ( err )
222242 continue
223243 }
224244
0 commit comments