Skip to content

Commit

Permalink
Use sync.Map for websocket cancelContext map (minio#3368)
Browse files Browse the repository at this point in the history
  • Loading branch information
cesnietor authored Jun 6, 2024
1 parent 3e83a30 commit cf05d50
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions api/ws_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/minio/console/models"
Expand All @@ -29,15 +30,16 @@ import (

func (wsc *wsMinioClient) objectManager(session *models.Principal) {
// Storage of Cancel Contexts for this connection
cancelContexts := make(map[int64]context.CancelFunc)
var cancelContexts sync.Map
// Initial goroutine
defer func() {
// We close socket at the end of requests
wsc.conn.close()
for _, c := range cancelContexts {
// invoke cancel
c()
}
cancelContexts.Range(func(_, value interface{}) bool {
cancelFunc := value.(context.CancelFunc)
cancelFunc()
return true
})
}()

writeChannel := make(chan WSResponse)
Expand Down Expand Up @@ -80,26 +82,28 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
ctx, cancel := context.WithCancel(context.Background())

// We store the cancel func associated with this request
cancelContexts[messageRequest.RequestID] = cancel
cancelContexts.Store(messageRequest.RequestID, cancel)

const itemsPerBatch = 1000
switch messageRequest.Mode {
case "close":
return
case "cancel":
// if we have that request id, cancel it
if cancelFunc, ok := cancelContexts[messageRequest.RequestID]; ok {
cancelFunc()
delete(cancelContexts, messageRequest.RequestID)
if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok {
cancelFunc.(context.CancelFunc)()
cancelContexts.Delete(messageRequest.RequestID)
}
case "objects":
// cancel all previous open objects requests for listing
for rid, c := range cancelContexts {
cancelContexts.Range(func(key, value interface{}) bool {
rid := key.(int64)
if rid < messageRequest.RequestID {
// invoke cancel
c()
cancelFunc := value.(context.CancelFunc)
cancelFunc()
}
}
return true
})

// start listing and writing to web socket
go func() {
Expand All @@ -118,9 +122,10 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
}
var buffer []ObjectResponse
for lsObj := range startObjectsListing(ctx, wsc.client, objectRqConfigs) {
if cancelContexts[messageRequest.RequestID] == nil {
if _, ok := cancelContexts.Load(messageRequest.RequestID); !ok {
return
}

if lsObj.Err != nil {
sendWSResponse(WSResponse{
RequestID: messageRequest.RequestID,
Expand Down Expand Up @@ -162,16 +167,18 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
})

// remove the cancellation context
delete(cancelContexts, messageRequest.RequestID)
cancelContexts.Delete(messageRequest.RequestID)
}()
case "rewind":
// cancel all previous open objects requests for listing
for rid, c := range cancelContexts {
cancelContexts.Range(func(key, value interface{}) bool {
rid := key.(int64)
if rid < messageRequest.RequestID {
// invoke cancel
c()
cancelFunc := value.(context.CancelFunc)
cancelFunc()
}
}
return true
})

// start listing and writing to web socket
go func() {
Expand Down Expand Up @@ -253,7 +260,7 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) {
})

// remove the cancellation context
delete(cancelContexts, messageRequest.RequestID)
cancelContexts.Delete(messageRequest.RequestID)
}()
}
}
Expand Down

0 comments on commit cf05d50

Please sign in to comment.