Skip to content

Commit

Permalink
Merge pull request #78 from a-narsudinov/master
Browse files Browse the repository at this point in the history
Fix the concurrency issues in `removeObjectsOneByOne`
  • Loading branch information
vitalif authored Feb 12, 2024
2 parents c065096 + 6a7b88e commit 700508e
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions pkg/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"net/url"
"sync/atomic"

"github.com/golang/glog"
"github.com/minio/minio-go/v7"
Expand Down Expand Up @@ -170,11 +171,11 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error {
// will delete files one by one without file lock
func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
parallelism := 16
objectsCh := make(chan minio.ObjectInfo, 1)
objectsCh := make(chan minio.ObjectInfo, parallelism)
guardCh := make(chan int, parallelism)
var listErr error
totalObjects := 0
removeErrors := 0
var totalObjects int64 = 0
var removeErrors int64 = 0

go func() {
defer close(objectsCh)
Expand All @@ -185,7 +186,7 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {
listErr = object.Err
return
}
totalObjects++
atomic.AddInt64(&totalObjects, 1)
objectsCh <- object
}
}()
Expand All @@ -197,15 +198,15 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error {

for object := range objectsCh {
guardCh <- 1
go func() {
err := client.minio.RemoveObject(client.ctx, bucketName, object.Key,
minio.RemoveObjectOptions{VersionID: object.VersionID})
go func(obj minio.ObjectInfo) {
err := client.minio.RemoveObject(client.ctx, bucketName, obj.Key,
minio.RemoveObjectOptions{VersionID: obj.VersionID})
if err != nil {
glog.Errorf("Failed to remove object %s, error: %s", object.Key, err)
removeErrors++
glog.Errorf("Failed to remove object %s, error: %s", obj.Key, err)
atomic.AddInt64(&removeErrors, 1)
}
<- guardCh
}()
}(object)
}
for i := 0; i < parallelism; i++ {
guardCh <- 1
Expand Down

0 comments on commit 700508e

Please sign in to comment.