From 22176f4e0f855944a3ad05351ac886a39c5d5d73 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 25 Jun 2024 08:50:31 -0700 Subject: [PATCH] fix: objectManager implementation avoid racy goroutines (#3392) fixes #3391 --- api/ws_objects.go | 251 +++++++++++++++++++++++----------------------- 1 file changed, 123 insertions(+), 128 deletions(-) diff --git a/api/ws_objects.go b/api/ws_objects.go index 4f4db04b9d..c7d607fd3f 100644 --- a/api/ws_objects.go +++ b/api/ws_objects.go @@ -35,9 +35,11 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { defer func() { // We close socket at the end of requests wsc.conn.close() - cancelContexts.Range(func(_, value interface{}) bool { + cancelContexts.Range(func(key, value interface{}) bool { cancelFunc := value.(context.CancelFunc) cancelFunc() + + cancelContexts.Delete(key) return true }) }() @@ -55,6 +57,7 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { // Read goroutine go func() { defer close(writeChannel) + for { select { case <-done: @@ -72,10 +75,9 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { // We get request data & review information var messageRequest ObjectsRequest - err := json.Unmarshal(message, &messageRequest) - if err != nil { - LogInfo("Error on message request unmarshal") - return + if err := json.Unmarshal(message, &messageRequest); err != nil { + LogInfo("Error on message request unmarshal", err) + continue } // new message, new context @@ -84,184 +86,177 @@ func (wsc *wsMinioClient) objectManager(session *models.Principal) { // We store the cancel func associated with this request cancelContexts.Store(messageRequest.RequestID, cancel) - const itemsPerBatch = 1000 switch messageRequest.Mode { - case "close": - return - case "cancel": - // if we have that request id, cancel it - if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok { - cancelFunc.(context.CancelFunc)() - cancelContexts.Delete(messageRequest.RequestID) - } - case "objects": + case "objects", "rewind": // cancel all previous open objects requests for listing cancelContexts.Range(func(key, value interface{}) bool { rid := key.(int64) if rid < messageRequest.RequestID { cancelFunc := value.(context.CancelFunc) cancelFunc() + + cancelContexts.Delete(key) } return true }) + } + const itemsPerBatch = 1000 + switch messageRequest.Mode { + case "close": + return + case "cancel": + // if we have that request id, cancel it + if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok { + cancelFunc.(context.CancelFunc)() + cancelContexts.Delete(messageRequest.RequestID) + } + case "objects": // start listing and writing to web socket - go func() { - objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest) - if err != nil { - LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error())) + objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest) + if err != nil { + LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error())) + + sendWSResponse(WSResponse{ + RequestID: messageRequest.RequestID, + Error: ErrorWithContext(ctx, err), + Prefix: messageRequest.Prefix, + BucketName: messageRequest.BucketName, + }) + + return + } + var buffer []ObjectResponse + for lsObj := range startObjectsListing(ctx, wsc.client, objectRqConfigs) { + if lsObj.Err != nil { sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, - Error: ErrorWithContext(ctx, err), + Error: ErrorWithContext(ctx, lsObj.Err), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, }) - return + continue } - var buffer []ObjectResponse - for lsObj := range startObjectsListing(ctx, wsc.client, objectRqConfigs) { - if _, ok := cancelContexts.Load(messageRequest.RequestID); !ok { - return - } - - if lsObj.Err != nil { - sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - Error: ErrorWithContext(ctx, lsObj.Err), - Prefix: messageRequest.Prefix, - BucketName: messageRequest.BucketName, - }) - - continue - } - objItem := ObjectResponse{ - Name: lsObj.Key, - Size: lsObj.Size, - LastModified: lsObj.LastModified.Format(time.RFC3339), - VersionID: lsObj.VersionID, - IsLatest: lsObj.IsLatest, - DeleteMarker: lsObj.IsDeleteMarker, - } - buffer = append(buffer, objItem) - - if len(buffer) >= itemsPerBatch { - sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - Data: buffer, - }) - buffer = nil - } + objItem := ObjectResponse{ + Name: lsObj.Key, + Size: lsObj.Size, + LastModified: lsObj.LastModified.Format(time.RFC3339), + VersionID: lsObj.VersionID, + IsLatest: lsObj.IsLatest, + DeleteMarker: lsObj.IsDeleteMarker, } - if len(buffer) > 0 { + buffer = append(buffer, objItem) + + if len(buffer) >= itemsPerBatch { sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Data: buffer, }) + buffer = nil } - + } + if len(buffer) > 0 { sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - RequestEnd: true, + RequestID: messageRequest.RequestID, + Data: buffer, }) + } - // remove the cancellation context - cancelContexts.Delete(messageRequest.RequestID) - }() - case "rewind": - // cancel all previous open objects requests for listing - cancelContexts.Range(func(key, value interface{}) bool { - rid := key.(int64) - if rid < messageRequest.RequestID { - cancelFunc := value.(context.CancelFunc) - cancelFunc() - } - return true + sendWSResponse(WSResponse{ + RequestID: messageRequest.RequestID, + RequestEnd: true, }) + // if we have that request id, cancel it + if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok { + cancelFunc.(context.CancelFunc)() + cancelContexts.Delete(messageRequest.RequestID) + } + case "rewind": // start listing and writing to web socket - go func() { - objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest) - if err != nil { - LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error())) - sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - Error: ErrorWithContext(ctx, err), - Prefix: messageRequest.Prefix, - BucketName: messageRequest.BucketName, - }) + objectRqConfigs, err := getObjectsOptionsFromReq(messageRequest) + if err != nil { + LogInfo(fmt.Sprintf("Error during Objects OptionsParse %s", err.Error())) + sendWSResponse(WSResponse{ + RequestID: messageRequest.RequestID, + Error: ErrorWithContext(ctx, err), + Prefix: messageRequest.Prefix, + BucketName: messageRequest.BucketName, + }) - return - } + return + } + + clientIP := wsc.conn.remoteAddress() + + s3Client, err := newS3BucketClient(session, objectRqConfigs.BucketName, objectRqConfigs.Prefix, clientIP) + if err != nil { + sendWSResponse(WSResponse{ + RequestID: messageRequest.RequestID, + Error: ErrorWithContext(ctx, err), + Prefix: messageRequest.Prefix, + BucketName: messageRequest.BucketName, + }) + + return + } + + mcS3C := mcClient{client: s3Client} - clientIP := wsc.conn.remoteAddress() + var buffer []ObjectResponse - s3Client, err := newS3BucketClient(session, objectRqConfigs.BucketName, objectRqConfigs.Prefix, clientIP) - if err != nil { + for lsObj := range startRewindListing(ctx, mcS3C, objectRqConfigs) { + if lsObj.Err != nil { sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, - Error: ErrorWithContext(ctx, err), + Error: ErrorWithContext(ctx, lsObj.Err.ToGoError()), Prefix: messageRequest.Prefix, BucketName: messageRequest.BucketName, }) - cancel() - return + continue } - mcS3C := mcClient{client: s3Client} - - var buffer []ObjectResponse - - for lsObj := range startRewindListing(ctx, mcS3C, objectRqConfigs) { - if lsObj.Err != nil { - sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - Error: ErrorWithContext(ctx, lsObj.Err.ToGoError()), - Prefix: messageRequest.Prefix, - BucketName: messageRequest.BucketName, - }) - - continue - } - - name := strings.Replace(lsObj.URL.Path, fmt.Sprintf("/%s/", objectRqConfigs.BucketName), "", 1) - - objItem := ObjectResponse{ - Name: name, - Size: lsObj.Size, - LastModified: lsObj.Time.Format(time.RFC3339), - VersionID: lsObj.VersionID, - IsLatest: lsObj.IsLatest, - DeleteMarker: lsObj.IsDeleteMarker, - } - buffer = append(buffer, objItem) - - if len(buffer) >= itemsPerBatch { - sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - Data: buffer, - }) - buffer = nil - } + name := strings.Replace(lsObj.URL.Path, fmt.Sprintf("/%s/", objectRqConfigs.BucketName), "", 1) + objItem := ObjectResponse{ + Name: name, + Size: lsObj.Size, + LastModified: lsObj.Time.Format(time.RFC3339), + VersionID: lsObj.VersionID, + IsLatest: lsObj.IsLatest, + DeleteMarker: lsObj.IsDeleteMarker, } - if len(buffer) > 0 { + buffer = append(buffer, objItem) + + if len(buffer) >= itemsPerBatch { sendWSResponse(WSResponse{ RequestID: messageRequest.RequestID, Data: buffer, }) + buffer = nil } + } + if len(buffer) > 0 { sendWSResponse(WSResponse{ - RequestID: messageRequest.RequestID, - RequestEnd: true, + RequestID: messageRequest.RequestID, + Data: buffer, }) + } + + sendWSResponse(WSResponse{ + RequestID: messageRequest.RequestID, + RequestEnd: true, + }) - // remove the cancellation context + // if we have that request id, cancel it + if cancelFunc, ok := cancelContexts.Load(messageRequest.RequestID); ok { + cancelFunc.(context.CancelFunc)() cancelContexts.Delete(messageRequest.RequestID) - }() + } } } }