Skip to content

Commit

Permalink
chore: avoid doing some unnecessary work while listing or merging sha…
Browse files Browse the repository at this point in the history
…rded delete requests (backport k240) (#16127)

Co-authored-by: Sandeep Sukhani <[email protected]>
  • Loading branch information
loki-gh-app[bot] and sandeepsukhani authored Feb 6, 2025
1 parent 9b35660 commit 97ddd09
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 28 deletions.
35 changes: 30 additions & 5 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package deletion
import (
"context"
"fmt"
"slices"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -94,8 +96,10 @@ func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error
return err
}

deletesPerRequest := partitionByRequestID(deleteGroups)
deleteRequests := mergeDeletes(deletesPerRequest)
slices.SortFunc(deleteGroups, func(a, b DeleteRequest) int {
return strings.Compare(a.RequestID, b.RequestID)
})
deleteRequests := mergeDeletes(deleteGroups)
for _, req := range deleteRequests {
// do not consider requests which do not have an id. Request ID won't be set in some tests or there is a bug in our code for loading requests.
if req.RequestID == "" {
Expand All @@ -108,17 +112,38 @@ func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error
continue
}
// do not do anything if we are not done with processing all the shards or the number of shards is 1
if req.Status != StatusProcessed || len(deletesPerRequest[req.RequestID]) == 1 {
if req.Status != StatusProcessed {
continue
}

var idxStart, idxEnd int
for i := range deleteGroups {
if req.RequestID == deleteGroups[i].RequestID {
idxStart = i
break
}
}

for i := len(deleteGroups) - 1; i > 0; i-- {
if req.RequestID == deleteGroups[i].RequestID {
idxEnd = i
break
}
}

// do not do anything if the number of shards is 1
if idxStart == idxEnd {
continue
}
reqShards := deleteGroups[idxStart : idxEnd+1]

level.Info(util_log.Logger).Log("msg", "merging sharded request",
"request_id", req.RequestID,
"num_shards", len(deletesPerRequest),
"num_shards", len(reqShards),
"start_time", req.StartTime.Unix(),
"end_time", req.EndTime.Unix(),
)
if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, deletesPerRequest[req.RequestID]); err != nil {
if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, reqShards); err != nil {
return err
}
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/compactor/deletion/delete_requests_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,14 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu

func (ds *deleteRequestsStore) deleteRequestsWithDetails(ctx context.Context, partialDeleteRequests []DeleteRequest) ([]DeleteRequest, error) {
deleteRequests := make([]DeleteRequest, 0, len(partialDeleteRequests))
for _, group := range partitionByRequestID(partialDeleteRequests) {
for _, deleteRequest := range group {
requestWithDetails, err := ds.queryDeleteRequestDetails(ctx, deleteRequest)
if err != nil {
return nil, err
}
deleteRequests = append(deleteRequests, requestWithDetails)
for _, deleteRequest := range partialDeleteRequests {
requestWithDetails, err := ds.queryDeleteRequestDetails(ctx, deleteRequest)
if err != nil {
return nil, err
}
deleteRequests = append(deleteRequests, requestWithDetails)
}

return deleteRequests, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/compactor/deletion/grpc_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func (g *GRPCRequestHandler) GetDeleteRequests(ctx context.Context, _ *grpc.GetD
return nil, err
}

deletesPerRequest := partitionByRequestID(deleteGroups)
deleteRequests := mergeDeletes(deletesPerRequest)
deleteRequests := mergeDeletes(deleteGroups)

sort.Slice(deleteRequests, func(i, j int) bool {
return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt
Expand Down
27 changes: 21 additions & 6 deletions pkg/compactor/deletion/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"math"
"net/http"
"net/url"
"slices"
"sort"
"strings"
"time"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -141,8 +143,7 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
return
}

deletesPerRequest := partitionByRequestID(deleteGroups)
deleteRequests := mergeDeletes(deletesPerRequest)
deleteRequests := mergeDeletes(deleteGroups)

sort.Slice(deleteRequests, func(i, j int) bool {
return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt
Expand All @@ -155,17 +156,31 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
}
}

func mergeDeletes(groups map[string][]DeleteRequest) []DeleteRequest {
func mergeDeletes(reqs []DeleteRequest) []DeleteRequest {
if len(reqs) <= 1 {
return reqs
}
slices.SortFunc(reqs, func(a, b DeleteRequest) int {
return strings.Compare(a.RequestID, b.RequestID)
})
mergedRequests := []DeleteRequest{} // Declare this way so the return value is [] rather than null
for _, deletes := range groups {
startTime, endTime, status := mergeData(deletes)
newDelete := deletes[0]
// find the start and end of shards of same request and merge them
i := 0
for j := 0; j < len(reqs); j++ {
// if this is not the last request in the list and the next request belongs to same shard then keep looking further
if j < len(reqs)-1 && reqs[i].RequestID == reqs[j+1].RequestID {
continue
}
startTime, endTime, status := mergeData(reqs[i : j+1])
newDelete := reqs[i]
newDelete.StartTime = startTime
newDelete.EndTime = endTime
newDelete.Status = status

mergedRequests = append(mergedRequests, newDelete)
i = j + 1
}

return mergedRequests
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/compactor/deletion/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,3 @@ func deleteModeFromLimits(l Limits, userID string) (deletionmode.Mode, error) {
mode := l.DeletionMode(userID)
return deletionmode.ParseMode(mode)
}

func partitionByRequestID(reqs []DeleteRequest) map[string][]DeleteRequest {
groups := make(map[string][]DeleteRequest)
for _, req := range reqs {
groups[req.RequestID] = append(groups[req.RequestID], req)
}
return groups
}

0 comments on commit 97ddd09

Please sign in to comment.