diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index cd7bc5fad8fbe..9391d50801b07 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -3,7 +3,9 @@ package deletion import ( "context" "fmt" + "slices" "sort" + "strings" "sync" "time" @@ -94,8 +96,10 @@ func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error return err } - deletesPerRequest := partitionByRequestID(deleteGroups) - deleteRequests := mergeDeletes(deletesPerRequest) + slices.SortFunc(deleteGroups, func(a, b DeleteRequest) int { + return strings.Compare(a.RequestID, b.RequestID) + }) + deleteRequests := mergeDeletes(deleteGroups) for _, req := range deleteRequests { // do not consider requests which do not have an id. Request ID won't be set in some tests or there is a bug in our code for loading requests. if req.RequestID == "" { @@ -108,17 +112,38 @@ func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error continue } // do not do anything if we are not done with processing all the shards or the number of shards is 1 - if req.Status != StatusProcessed || len(deletesPerRequest[req.RequestID]) == 1 { + if req.Status != StatusProcessed { + continue + } + + var idxStart, idxEnd int + for i := range deleteGroups { + if req.RequestID == deleteGroups[i].RequestID { + idxStart = i + break + } + } + + for i := len(deleteGroups) - 1; i > 0; i-- { + if req.RequestID == deleteGroups[i].RequestID { + idxEnd = i + break + } + } + + // do not do anything if the number of shards is 1 + if idxStart == idxEnd { continue } + reqShards := deleteGroups[idxStart : idxEnd+1] level.Info(util_log.Logger).Log("msg", "merging sharded request", "request_id", req.RequestID, - "num_shards", len(deletesPerRequest), + "num_shards", len(reqShards), "start_time", req.StartTime.Unix(), "end_time", req.EndTime.Unix(), ) - if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, deletesPerRequest[req.RequestID]); err != nil { + if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, reqShards); err != nil { return err } } diff --git a/pkg/compactor/deletion/delete_requests_store.go b/pkg/compactor/deletion/delete_requests_store.go index af962862f39f3..405ffef08cb64 100644 --- a/pkg/compactor/deletion/delete_requests_store.go +++ b/pkg/compactor/deletion/delete_requests_store.go @@ -292,15 +292,14 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu func (ds *deleteRequestsStore) deleteRequestsWithDetails(ctx context.Context, partialDeleteRequests []DeleteRequest) ([]DeleteRequest, error) { deleteRequests := make([]DeleteRequest, 0, len(partialDeleteRequests)) - for _, group := range partitionByRequestID(partialDeleteRequests) { - for _, deleteRequest := range group { - requestWithDetails, err := ds.queryDeleteRequestDetails(ctx, deleteRequest) - if err != nil { - return nil, err - } - deleteRequests = append(deleteRequests, requestWithDetails) + for _, deleteRequest := range partialDeleteRequests { + requestWithDetails, err := ds.queryDeleteRequestDetails(ctx, deleteRequest) + if err != nil { + return nil, err } + deleteRequests = append(deleteRequests, requestWithDetails) } + return deleteRequests, nil } diff --git a/pkg/compactor/deletion/grpc_request_handler.go b/pkg/compactor/deletion/grpc_request_handler.go index bf68c397043b4..038c154b31ecd 100644 --- a/pkg/compactor/deletion/grpc_request_handler.go +++ b/pkg/compactor/deletion/grpc_request_handler.go @@ -45,8 +45,7 @@ func (g *GRPCRequestHandler) GetDeleteRequests(ctx context.Context, _ *grpc.GetD return nil, err } - deletesPerRequest := partitionByRequestID(deleteGroups) - deleteRequests := mergeDeletes(deletesPerRequest) + deleteRequests := mergeDeletes(deleteGroups) sort.Slice(deleteRequests, func(i, j int) bool { return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt diff --git a/pkg/compactor/deletion/request_handler.go b/pkg/compactor/deletion/request_handler.go index 45d5db0918385..448ab7a78dd8e 100644 --- a/pkg/compactor/deletion/request_handler.go +++ b/pkg/compactor/deletion/request_handler.go @@ -7,7 +7,9 @@ import ( "math" "net/http" "net/url" + "slices" "sort" + "strings" "time" "github.com/go-kit/log/level" @@ -141,8 +143,7 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite return } - deletesPerRequest := partitionByRequestID(deleteGroups) - deleteRequests := mergeDeletes(deletesPerRequest) + deleteRequests := mergeDeletes(deleteGroups) sort.Slice(deleteRequests, func(i, j int) bool { return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt @@ -155,17 +156,31 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite } } -func mergeDeletes(groups map[string][]DeleteRequest) []DeleteRequest { +func mergeDeletes(reqs []DeleteRequest) []DeleteRequest { + if len(reqs) <= 1 { + return reqs + } + slices.SortFunc(reqs, func(a, b DeleteRequest) int { + return strings.Compare(a.RequestID, b.RequestID) + }) mergedRequests := []DeleteRequest{} // Declare this way so the return value is [] rather than null - for _, deletes := range groups { - startTime, endTime, status := mergeData(deletes) - newDelete := deletes[0] + // find the start and end of shards of same request and merge them + i := 0 + for j := 0; j < len(reqs); j++ { + // if this is not the last request in the list and the next request belongs to same shard then keep looking further + if j < len(reqs)-1 && reqs[i].RequestID == reqs[j+1].RequestID { + continue + } + startTime, endTime, status := mergeData(reqs[i : j+1]) + newDelete := reqs[i] newDelete.StartTime = startTime newDelete.EndTime = endTime newDelete.Status = status mergedRequests = append(mergedRequests, newDelete) + i = j + 1 } + return mergedRequests } diff --git a/pkg/compactor/deletion/util.go b/pkg/compactor/deletion/util.go index c20da8a4a2602..b71102a4c8715 100644 --- a/pkg/compactor/deletion/util.go +++ b/pkg/compactor/deletion/util.go @@ -34,11 +34,3 @@ func deleteModeFromLimits(l Limits, userID string) (deletionmode.Mode, error) { mode := l.DeletionMode(userID) return deletionmode.ParseMode(mode) } - -func partitionByRequestID(reqs []DeleteRequest) map[string][]DeleteRequest { - groups := make(map[string][]DeleteRequest) - for _, req := range reqs { - groups[req.RequestID] = append(groups[req.RequestID], req) - } - return groups -}