Skip to content

Commit

Permalink
handle closed chan and break loop on error
Browse files Browse the repository at this point in the history
avoiding goroutine leakage
  • Loading branch information
lostbean committed Jan 4, 2024
1 parent 9606080 commit 75667b0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
4 changes: 2 additions & 2 deletions engine/server/engine/server/websocket_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func streamStarlarkLogsWithWebsocket[T any](ctx echo.Context, cors cors.Cors, st
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"streamerUUID": streamerUUID,
}).Error("Failed to stream all data")
}).Warn("Failed to stream all data")
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func streamServiceLogsWithWebsocket(ctx echo.Context, cors cors.Cors, streamer s
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"services": streamer.GetRequestedServiceUuids(),
}).Error("Failed to stream all data")
}).Warn("Failed to stream all data")
}
}

Expand Down
56 changes: 39 additions & 17 deletions engine/server/engine/streaming/websocket_pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,44 +112,45 @@ func (pump *WebsocketPump[T]) startPumping() {
// we also need a dummy reader loop.
go pump.readLoop()

WRITE_LOOP:
for {
select {
case <-ticker.C:
if err := pump.websocket.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
logrus.Debug("Websocket connection did not meet the write deadline")
logrus.Error("Websocket connection did not meet the write deadline")
pump.connectionError = &err
return
break WRITE_LOOP
}
if err := pump.websocket.WriteMessage(websocket.PingMessage, nil); err != nil {
logrus.Debug("Websocket connection is likely closed, exiting keep alive process")
logrus.Error("Websocket connection is likely closed, exiting keep alive process")
pump.connectionError = &err
return
break WRITE_LOOP
}
case msg := <-pump.inputChan:
if err := pump.websocket.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
logrus.Debug("Websocket connection did not meet the write deadline")
logrus.Error("Websocket connection did not meet the write deadline")
pump.connectionError = &err
return
break WRITE_LOOP
}
if err := pump.websocket.WriteJSON(msg); err != nil {
logrus.WithError(stacktrace.Propagate(err, "Failed to send value of type `%T` via websocket", msg)).Errorf("Failed to write message to websocket, closing it.")
logrus.WithError(err).Warnf("Failed to send value of type `%T` via websocket", msg)
pump.connectionError = &err
return
break WRITE_LOOP
}
case msg := <-pump.infoChan:
if err := pump.websocket.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
logrus.Debug("Websocket connection did not meet the write deadline")
logrus.Error("Websocket connection did not meet the write deadline")
pump.connectionError = &err
return
break WRITE_LOOP
}
if err := pump.websocket.WriteJSON(msg); err != nil {
logrus.WithError(stacktrace.Propagate(err, "Failed to send value of type `%T` via websocket", msg)).Errorf("Failed to write message to websocket, closing it.")
logrus.WithError(err).Warnf("Failed to send value of type `%T` via websocket", msg)
pump.connectionError = &err
return
break WRITE_LOOP
}
case <-pump.ctx.Done():
logrus.Debug("Websocket pump has been asked to close, closing it.")
return
break WRITE_LOOP
}
}
}
Expand All @@ -158,24 +159,45 @@ func (pump *WebsocketPump[T]) PumpResponseInfo(msg *api_type.ResponseInfo) error
if pump.closed {
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return nil
}

select {
case _, ok := <-pump.infoChan:
if !ok {
logrus.Debug("Worker channel closed, cannot send message")
}
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return stacktrace.NewError("Websocket has been closed due connection error")
case pump.infoChan <- msg:
return nil
}
pump.infoChan <- msg
return nil
}

func (pump *WebsocketPump[T]) PumpMessage(msg *T) error {
if pump.closed {
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return nil
}

select {
case _, ok := <-pump.inputChan:
if !ok {
logrus.Debug("Worker channel closed, cannot send message")
}
if pump.connectionError != nil {
return stacktrace.Propagate(*pump.connectionError, "Websocket has been closed due connection error")
}
return stacktrace.NewError("Websocket has been closed due connection error")
case pump.inputChan <- msg:
return nil
}
pump.inputChan <- msg
return nil

}

func (pump *WebsocketPump[T]) Close() {
Expand Down

0 comments on commit 75667b0

Please sign in to comment.